// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------
// Modification history:
// feature: authentication-free shared key and token derivation between clients
// feature: change E2E packet format
// feature: DServer support simple entity
// feature: support auth token
// feature: E2E New Requirement Development
// feature: Development of new requirements for E2E functionality
// ------------------------------------------------------------------

#ifndef INCLUDE_EDDS_RTPS_MESSAGES_RTPSMESSAGEGROUP_H_
#define INCLUDE_EDDS_RTPS_MESSAGES_RTPSMESSAGEGROUP_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <edds/rtps/messages/RTPSMessageCreator.h>
#include <edds/rtps/common/FragmentNumber.h>

#include <vector>
#include <chrono>
#include <cassert>
#include <memory>
#include <ertps/types/TypesBase.h>
#include <edds/rtps/messages/RTPSMessageSenderInterface.hpp>

#if !defined(WIN32)
#include <tracepoint/TracePointManager.hpp>
#endif

namespace evbs {
namespace ertps {
namespace rtps {

using vbs::common::types::DOMAIN_ID_UNKNOWN;
using vbs::common::types::DomainId_t;

class RTPSParticipantImpl;
class Endpoint;
class RTPSMessageGroup_t;

/**
 * RTPSMessageGroup Class used to construct a RTPS message.
 * @ingroup WRITER_MODULE
 */
class RTPSMessageGroup {
 public:
    /*!
     * Exception thrown when a operation exceeds the maximum blocking time.
     */
    class timeout : public std::runtime_error {
     public:
        timeout() : std::runtime_error("timeout") {}

        virtual ~timeout() = default;
    };

    /*!
     * Exception thrown when a operation exceeds the maximum bytes this object can process in the
     * current period of time.
     */
    class limit_exceeded : public std::runtime_error {
     public:
        limit_exceeded() : std::runtime_error("limit_exceeded") {}

        virtual ~limit_exceeded() = default;
    };

    /**
     * Basic constructor.
     * Constructs a RTPSMessageGroup allowing to allocate its own buffer.
     * @param participant Pointer to the participant sending data.
     * @param internal_buffer true indicates this object to allocate its own buffer. false indicates
     * to get a buffer from the participant.
     */
    RTPSMessageGroup(RTPSParticipantImpl* participant, bool internal_buffer = false);

    /**
     * Basic constructor.
     * Constructs a RTPSMessageGroup allowing the destination endpoints to change.
     * @param participant Pointer to the participant sending data.
     * @param endpoint Pointer to the endpoint sending data.
     * @param msg_sender Pointer to message sender interface.
     * @param max_blocking_time_point Future time point where blocking send should end.
     */
    RTPSMessageGroup(RTPSParticipantImpl* participant, Endpoint* endpoint, RTPSMessageSenderInterface* msg_sender);

    RTPSMessageGroup(RTPSParticipantImpl* participant, Endpoint* endpoint, RTPSMessageSenderInterface* msg_sender,
                     bool add_info_src, GuidPrefix_t src_guid_prefix);

    RTPSMessageGroup(RTPSParticipantImpl* participant, Endpoint* endpoint, RTPSMessageSenderInterface* msg_sender,
                     bool add_info_src, GuidPrefix_t src_guid_prefix, DomainId_t domain_id);
    ~RTPSMessageGroup();

#if HAVE_SECURITY
    void get_authtoken(Endpoint* endpoint, uint64_t seq);
#endif

    /**
     * Adds a DATA message to the group.
     * @param change Reference to the cache change to send.
     * @param enable_e2e True when e2e protection is enabled.
     * @return True when message was added to the group.
     */
    bool add_data(CacheChange_t& change, bool enable_e2e, E2EProfile04_t& e2e_profile04);

    /**
     * Adds a DATA_FRAG message to the group.
     * @param change Reference to the cache change to send.
     * @param fragment_number Index (1 based) of the fragment to send.
     * @param expects_inline_qos True when one destination is expecting inline QOS.
     * @return True when message was added to the group.
     */
    bool add_data_frag(CacheChange_t& change, const uint32_t fragment_number, bool enable_e2e,
                       E2EProfile04_t& e2e_profile04);

    /**
     * Adds a HEARTBEAT message to the group.
     * @param first_seq First available sequence number.
     * @param last_seq Last available sequence number.
     * @param count Counting identifier.
     * @param is_final Should final flag be set?
     * @param liveliness_flag Should liveliness flag be set?
     * @param batch_send Should send batch-heartbeat?
     * @return True when message was added to the group.
     */
    bool add_heartbeat(const SequenceNumber_t& first_seq, const SequenceNumber_t& last_seq, Count_t count,
                       bool is_final, bool liveliness_flag);

    /**
     * Adds one or more GAP messages to the group.
     * @param changes_seq_numbers Set of missed sequence numbers.
     * @return True when messages were added to the group.
     */
    bool add_gap(std::set<SequenceNumber_t>& changes_seq_numbers);

    /**
     * Adds one GAP message to the group.
     * @param gap_initial_sequence Start of consecutive sequence numbers.
     * @param gap_bitmap Bitmap of non-consecutive sequence numbers.
     * @return True when message was added to the group.
     */
    bool add_gap(const SequenceNumber_t& gap_initial_sequence, const SequenceNumberSet_t& gap_bitmap);

    /**
     * Adds one GAP message to the group.
     * @param gap_initial_sequence Start of consecutive sequence numbers.
     * @param gap_bitmap Bitmap of non-consecutive sequence numbers.
     * @param reader_guid GUID of the destination reader.
     * @return True when message was added to the group.
     */
    bool add_gap(const SequenceNumber_t& gap_initial_sequence, const SequenceNumberSet_t& gap_bitmap,
                 const GUID_t& reader_guid);

    /**
     * Adds a ACKNACK message to the group.
     * @param seq_num_set Set of missing sequence numbers.
     * @param count Counting identifier.
     * @param final_flag Should final flag be set?
     * @return True when message was added to the group.
     */
    bool add_acknack(const SequenceNumberSet_t& seq_num_set, int32_t count, bool final_flag);

    /**
     * Adds a NACKFRAG message to the group.
     * @param seq_number Sequence number being nack'ed.
     * @param fn_state Set of missing fragment numbers.
     * @param count Counting identifier.
     * @return True when message was added to the group.
     */
    bool add_nackfrag(const SequenceNumber_t& seq_number, FragmentNumberSet_t fn_state, int32_t count);

    /**
     * To be used whenever destination locators/guids change between two add_xxx calls.
     * Automatically called inside add_xxx calls if destinations_have_changed() method of
     * RTPSMessageSenderInterface returns true.
     * May become private again with a refactor of RTPSMessageSenderInterface, adding a
     * group_has_been_flushed() method.
     */
    void flush_and_reset();

    /*!
     * Change dynamically the sender of next RTPS submessages.
     *
     * @param endpoint Pointer to next Endpoint sender. nullptr resets object to initial state.
     * @param msg_sender Pointer to the RTPSMessageSenderInterface will be used to send next RTPS
     * messages.. nullptr resets object to initial state.
     * @pre (endpoint != nullptr && msg_sender != nullptr) || (endpoint == nullptr && msg_sender ==
     * nullptr)
     */
    void sender(Endpoint* endpoint, RTPSMessageSenderInterface* msg_sender) {
        assert((endpoint != nullptr && msg_sender != nullptr) || (endpoint == nullptr && msg_sender == nullptr));
        if ((endpoint != endpoint_) || (msg_sender != sender_)) {
            flush_and_reset();
        }

        endpoint_ = endpoint;
        sender_ = msg_sender;
    }

    //! Maximum fragment size minus the headers
    static inline constexpr uint32_t get_max_fragment_payload_size() {
        // Max fragment is 64KBytes_max - header - inlineqos - 3(for better alignment)
        return std::numeric_limits<uint16_t>::max() - data_frag_header_size_ - max_inline_qos_size_ - 3U;
    }

    void set_sent_bytes_limitation(uint32_t limit) { sent_bytes_limitation_ = limit; }

    void reset_current_bytes_processed() { current_sent_bytes_ = 0U; }

    inline uint32_t get_current_bytes_processed() const { return current_sent_bytes_ + full_msg_->length; }

 private:
    static constexpr uint32_t data_frag_header_size_ = 28U;
    static constexpr uint32_t max_inline_qos_size_ = 32U;

    inline void reset_to_header();

    inline void flush();

    inline void send();

    inline bool check_and_maybe_flush(const GuidPrefix_t& destination_guid_prefix, bool is_big_submessage);

    bool add_info_dst_in_buffer(CDRMessage_t* buffer, const GuidPrefix_t& destination_guid_prefix);

    bool add_info_ts_in_buffer(const Time_t& timestamp);

    bool add_info_e2e_in_buffer(const EntityId_t& readerId, CacheChange_t& change);

    bool create_gap_submessage(const SequenceNumber_t& gap_initial_sequence, const SequenceNumberSet_t& gap_bitmap,
                               const EntityId_t& reader_id);
#if HAVE_SECURITY
    bool add_auth_token_in_buffer(const GuidPrefix_t& dst, uint32_t token);
    std::vector<uint8_t> key_derivation(const std::vector<uint8_t>& key, const GuidPrefix_t& src,
                                        const GuidPrefix_t& dst);
    bool add_shared_key_in_buffer(const std::vector<uint8_t>& key, const GuidPrefix_t& matched_src,
                                  const GuidPrefix_t& matched_dst);
#endif

    RTPSMessageSenderInterface* sender_ = nullptr;

    Endpoint* endpoint_ = nullptr;

    CDRMessage_t* full_msg_ = nullptr;

    GuidPrefix_t current_dst_;

    RTPSParticipantImpl* participant_ = nullptr;

#if HAVE_SECURITY

    CDRMessage_t* encrypt_msg_ = nullptr;

#endif  // if HAVE_SECURITY

    std::chrono::steady_clock::time_point max_blocking_time_point_;

    bool max_blocking_time_is_set_ = false;

    RTPSMessageGroup_t* send_buffer_;

    bool internal_buffer_ = false;

    uint32_t sent_bytes_limitation_ = 0U;

    uint32_t current_sent_bytes_ = 0U;

#if HAVE_SECURITY
    uint64_t m_token {};
#endif
    bool add_info_src_ = false;
    GuidPrefix_t src_guid_prefix_ = GuidPrefix_t::unknown();
    DomainId_t domain_id_ = DOMAIN_ID_UNKNOWN;
};

} /* namespace rtps */
} /* namespace ertps */
} /* namespace evbs */

#endif  // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif  // INCLUDE_EDDS_RTPS_MESSAGES_RTPSMESSAGEGROUP_H_
